package com.fly.demo.service;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import com.fly.demo.service.thread.PingCallable;
import com.fly.demo.service.thread.PingRunable;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
@Service
public class NodeService
{
    @Value("${listOfServers:192.168.182.10,127.0.0.1}")
    private List<String> listOfServers;
    
    @Autowired
    private WebClient webClient;
    
    private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new BasicThreadFactory.Builder().namingPattern("t-%03d").daemon(true).priority(Thread.MAX_PRIORITY).build());
    
    /**
     * 自动返回活跃节点
     * 
     * @return
     * @throws InterruptedException
     */
    public List<String> choose()
        throws InterruptedException
    {
        // TODO ArrayList 一定概率有null值，原因:通过new ArrayList<>()初始化的大小是0，首次插入触发扩容，并发可能导致出现null值
        // 线程检测活跃节点
        List<String> serverIp = new CopyOnWriteArrayList<String>();
        if (RandomUtils.nextBoolean())
        {
            listOfServers.stream().forEach(ip -> executorService.execute(new PingRunable(ip, serverIp)));
        }
        else // 匿名Runnable
        {
            listOfServers.stream()
                .forEach(ip -> executorService.execute(() -> webClient.get()
                    .uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port("2375").path("/_ping").build())// URI
                    .acceptCharset(StandardCharsets.UTF_8)
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(String.class)
                    .subscribe(resp -> {
                        log.info("serverIp: {} ===> resp: {}", ip, resp);
                        if (StringUtils.equals("OK", resp))
                        {
                            serverIp.add(ip);
                        }
                    })));
        }
        
        // 1000ms内未ping通，即认为不活跃
        int index = 0;
        while (serverIp.isEmpty() && (index++) < 100)
        {
            TimeUnit.MILLISECONDS.sleep(10);
            log.info("waitting......");
        }
        return serverIp;
    }
    
    /**
     * 返回全部活跃节点
     * 
     * @return
     * @throws InterruptedException
     */
    public List<String> invokeAll()
        throws InterruptedException
    {
        List<Future<String>> futures;
        switch (RandomUtils.nextInt(1, 4))
        {
            case 1:
                List<Callable<String>> tasks = new ArrayList<>();
                listOfServers.stream().forEach(ip -> tasks.add(new PingCallable(ip)));
                futures = executorService.invokeAll(tasks, 1000, TimeUnit.MILLISECONDS);
                break;
            
            case 2: // lambda写法
                futures = executorService.invokeAll(listOfServers.stream().map(ip -> new PingCallable(ip)).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
                break;
            
            case 3:
            default: // 匿名Callable
                futures = executorService.invokeAll(listOfServers.stream().map(ip -> new Callable<String>()
                {
                    @Override
                    public String call()
                    {
                        Mono<String> mono = webClient.get()
                            .uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port("2375").path("/_ping").build())// URI
                            .acceptCharset(StandardCharsets.UTF_8)
                            .accept(MediaType.APPLICATION_JSON)
                            .retrieve()
                            .bodyToMono(String.class);
                        return StringUtils.equals("OK", mono.block()) ? ip : null;
                    }
                }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
                break;
        }
        
        // 汇总结果
        return futures.stream().map(f -> {
            try
            {
                return f.get();
            }
            // catch (InterruptedException | ExecutionException e) 这样写会抛出异常到返回
            catch (Exception e)
            {
                log.error(e.getMessage(), e);
                return null;
            }
        }).filter(StringUtils::isNotBlank).collect(Collectors.toList());
    }
    
    /**
     * 自动返回首个活跃节点
     * 
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public String invokeAny()
        throws InterruptedException, ExecutionException, TimeoutException
    {
        // 线程检测活跃节点
        switch (RandomUtils.nextInt(1, 4))
        {
            case 1:
                List<Callable<String>> tasks = new ArrayList<>();
                listOfServers.stream().forEach(ip -> tasks.add(new PingCallable(ip)));
                return executorService.invokeAny(tasks, 1000, TimeUnit.MILLISECONDS);
            
            case 2: // lambda写法
                return executorService.invokeAny(listOfServers.stream().map(ip -> new PingCallable(ip)).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
            
            case 3:
            default: // 匿名Callable
                return executorService.invokeAny(listOfServers.stream().map(ip -> new Callable<String>()
                {
                    @Override
                    public String call()
                    {
                        String rsp = webClient.get()
                            .uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port("2375").path("/_ping").build())// URI
                            .acceptCharset(StandardCharsets.UTF_8)
                            .accept(MediaType.APPLICATION_JSON)
                            .retrieve()
                            .bodyToMono(String.class)
                            .block();
                        return StringUtils.equals("OK", rsp) ? ip : null;
                    }
                }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
        }
    }
}
